package com.pms.utils;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.pms.entity.BaseModel;
import com.pms.mapper.WaterPumpGroupAttrMapper;
import com.pms.mq.RabbitProducerTest;
import com.pms.service.IWaterPumpGroupService;
import com.pms.util.DateUtil;
import com.pms.web.WaterPumpWebSocketUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.*;

/**
 * Created by Administrator on 2018/1/11.
 */
//@Component
public class KafkaMessageToMysql {

    @Value("${kafka.topic.waterPump}")
    private String waterPumpTopic;
    @Autowired
    private WaterPumpGroupAttrMapper waterPumpAttrMapper;
    @Autowired
    private IWaterPumpGroupService waterPumpService;
    @Autowired
    private  KafkaMessageFormatUtil kafkaMessageFormatUtil;
    @Autowired
    RabbitProducerTest rabbitProducerTest;
    private Logger logger = LoggerFactory.getLogger(KafkaMessageToMysql.class);
    /**
     * 将水泵信息保存到数据库
     *
     * @param consumer
     */
    public void saveWaterPumpAttrData(KafkaConsumer<String, String> consumer) {
        try {
            consumer.subscribe(Collections.singletonList(waterPumpTopic));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(KafkaUtil.getTimeOutMs());
                Boolean sqlColumnTag = true;
                Map<String,List<String>> reportParamSqlMap = new HashMap<String, List<String>>();
                List<Map<String,Object>> waterPumpAttrList = new ArrayList<Map<String,Object>>();//正常推送集合
                JSONArray waterPumpAttrErrArr = new JSONArray();//故障推送集合
                for (ConsumerRecord<String, String> record : records) {
                    String ConsumerRecordValue = record.value();
                    if (ConsumerRecordValue.indexOf("{") == 0) {
                        boolean waterPumpAttrErrTag = false;
                        JSONObject waterPumpAttrJson = new JSONObject();
                        try {
                            JSONObject kafkaValueJSON = JSONObject.parseObject(ConsumerRecordValue);
                            Integer deciveStatus = null;
                            //设备状态
                            deciveStatus = kafkaValueJSON.getJSONObject("body").getJSONObject("metadata").getInteger("cr_device_status");
                            //设备名称 及 上报时间
                            String requestIDStr = kafkaValueJSON.getString("requestID");
                            if(StringUtils.isBlank(requestIDStr)){
                                logger.info("kafka上报的信息:设备名称及上报时间为空");
                                continue;// 获取设备名称及上报时间为空
                            }
                            JSONArray dataJsonArray = kafkaValueJSON.getJSONObject("tsdb").getJSONArray("data");
                            String[] requestIDStrArr = requestIDStr.split("/");

                            StringBuffer reportParamColumnStr = new StringBuffer(80).append("");
                            StringBuffer reportParamValueStr = new StringBuffer(80).append("");
                            reportParamColumnStr.append("waterPumpGroupId").append(",");//id值
                            reportParamColumnStr.append("deviceStatus").append(",");//设备状态
                            reportParamColumnStr.append("deviceName");//设备名称;
                            Long waterPumpGroupId = BaseModel.returnStaticIdLong();
                            reportParamValueStr.append(waterPumpGroupId).append(",");//id值
                            reportParamValueStr.append( deciveStatus ).append(",");//设备状态
                            reportParamValueStr.append("'" + requestIDStrArr[0] + "'");//设备名称
                            waterPumpAttrJson.put("waterPumpGroupId",waterPumpGroupId);
                            waterPumpAttrJson.put("deviceStatus",deciveStatus);
                            waterPumpAttrJson.put("deviceName",requestIDStrArr[0]);
                            if(requestIDStrArr.length>1){
                                Long reportTimeLong = null;
                                try {
                                    Long LastReportTimeStamp = System.currentTimeMillis();
                                    reportTimeLong = Long.parseLong(requestIDStrArr[1]);
                                    if( (reportTimeLong<<1) < LastReportTimeStamp){// 上报时间左移1位
                                        reportTimeLong=reportTimeLong*1000;
                                    }
                                }catch(NumberFormatException ex){
                                    logger.info("kafka上报的信息:上报时间格式出错");
                                    continue;//上报时间出错
                                }
                                reportParamColumnStr.append(",").append("reportTime");//上报时间
                                reportParamColumnStr.append(",").append("createTime");//创建时间
                                reportParamColumnStr.append(",").append("reportTimeFormat");//格式化后的上报时间
                                reportParamValueStr.append(",").append(requestIDStrArr[1]);
                                reportParamValueStr.append(",").append("'" + DateUtil.format( new Date(), "yyyy-MM-dd HH:mm:ss") + "'");
                                reportParamValueStr.append(",").append("'" + DateUtil.longToString(reportTimeLong, "yyyy-MM-dd HH:mm:ss") + "'");
                                waterPumpAttrJson.put("reportTime",reportTimeLong);//当前显示时间格式为时间戳
                                waterPumpAttrJson.put("reportTimeFormat","'" + DateUtil.longToString(reportTimeLong, "yyyy-MM-dd HH:mm:ss") + "'");
                            }
                            JSONObject ruleJson = waterPumpService.getMessageFormatRuleJson(requestIDStrArr[0]);//根据设备名称查规则
                            for (int x = 0,dataJsonArraySize =  dataJsonArray.size(); x <dataJsonArraySize; x++) {
                                JSONObject dataJson = (JSONObject) dataJsonArray.get(x);
                                String nameVar = dataJson.getString("name");
                                //录入属性值
                                reportParamColumnStr.append(",").append(nameVar);
//                                reportParamValueStr.append(",").append(dataJson.getInteger("val"));
                                Object value = kafkaMessageFormatUtil.kafkaMessageFormat(nameVar, dataJson.getInteger("val"), ruleJson);
                                reportParamValueStr.append(",").append(value);

                                waterPumpAttrJson.put(nameVar,value);
                                /* 判断是否有故障  */
                                if (nameVar.equals("ZXGZ")){
                                    if (dataJson.getInteger("val") != 0) {
                                        waterPumpAttrErrTag = true;
                                    }
                                }
                            }
                            //录入属性值-是否故障状态
                            reportParamColumnStr.append(",").append("isFault");
                            if(waterPumpAttrErrTag){
                                reportParamValueStr.append(",").append(1);
                            }
                            if(!waterPumpAttrErrTag){
                                reportParamValueStr.append(",").append(0);
                            }
                            /**
                             * 判断reportParamSqlMap中是否有这些列需要插入，对不同插入数据的列做处理。
                             */
                            List<String> valueSqlList = reportParamSqlMap.get(reportParamColumnStr.toString());
                            if(valueSqlList==null){
                                valueSqlList = new ArrayList<String>();
                            }
                            valueSqlList.add(reportParamValueStr.toString());
                            reportParamSqlMap.put(reportParamColumnStr.toString(),valueSqlList);
                        } catch (NullPointerException ex) {//空指针异常
                            logger.info("kafka水泵数据处理空指针异常:" + ex.getMessage());
                        } catch (JSONException ex) {//json转换异常
                            logger.info("kafka水泵数据处理json转换异常:" + ex.getMessage());
                        }
                        if (sqlColumnTag) {
                            sqlColumnTag = false;
                        }
                        waterPumpAttrList.add(waterPumpAttrJson);
                        if(waterPumpAttrErrTag){
                            waterPumpAttrErrArr.add(waterPumpAttrJson);
                        }
                    }
                }//kafka订阅 消息 循环
                if (!sqlColumnTag) {
                    StringBuffer insertKafkaDataSql = new StringBuffer(320).append("");
                    for(Map.Entry<String,List<String>> entry : reportParamSqlMap.entrySet()){
                        StringBuffer insertKafkaDataSqlColumn = new StringBuffer(160).append("");
                        StringBuffer insertKafkaDataSqlValue = new StringBuffer(160).append("");
                        insertKafkaDataSqlColumn.append("insert into water_pump_group_attr(");
                        insertKafkaDataSqlColumn.append(entry.getKey());
                        insertKafkaDataSqlColumn.append(") values(");
                        for(int x=0,ValueListSize=entry.getValue().size();x<ValueListSize;x++){
                            if(x>0){
                                insertKafkaDataSqlValue.append(",(");
                            }
                            insertKafkaDataSqlValue.append(entry.getValue().get(x));
                            insertKafkaDataSqlValue.append(")");
                        }
//                        System.out.println("key: " + entry.getKey() + " and value: " + entry.getValue());
                        insertKafkaDataSql.append(insertKafkaDataSqlColumn.toString());
                        insertKafkaDataSql.append(insertKafkaDataSqlValue.toString()).append(";");
                    }
                     /* 水泵数据存储 */
                     try {
                         waterPumpAttrMapper.insertKafkaData(insertKafkaDataSql.toString());
                     }catch (Exception ex){
                         logger.info("水泵消息存储异常:" + ex.getMessage());
                         ex.printStackTrace();
                     }
                    /* 将水泵 故障数据 发送到rabbitMq消息队列中*/
                    if(!waterPumpAttrErrArr.isEmpty()){
                        StringBuilder insertFaultSql = new StringBuilder("");
                        for(int x=0,faultSize = waterPumpAttrErrArr.size();x<faultSize;x++){
                            JSONObject objJson = waterPumpAttrErrArr.getJSONObject(x);
                            insertFaultSql = new StringBuilder("insert into water_pump_group_fault(");
                            insertFaultSql.append("waterPumpGroupId,ZXGZ,deviceName,reportTimeFormat,isSocketPush,isSmsPush,createTime");
                            insertFaultSql.append(") value(");
                            insertFaultSql.append(objJson.getLongValue("waterPumpGroupId"));
                            insertFaultSql.append(",").append(objJson.get("ZXGZ"));
                            insertFaultSql.append(",").append("'"+objJson.get("deviceName")+"'");
                            insertFaultSql.append(",").append(objJson.get("reportTimeFormat"));
                            insertFaultSql.append(",").append(0);//isSocketPush
                            insertFaultSql.append(",").append(0);//isSmsPush
                            insertFaultSql.append(",").append("'" + DateUtil.format( new Date(), "yyyy-MM-dd HH:mm:ss") + "'");//createTime
                            insertFaultSql.append(");");
                        }
                         /* 水泵数据存储 */
                        try {
                            waterPumpAttrMapper.insertKafkaData(insertFaultSql.toString());
                        }catch (Exception ex){
                            logger.info("水泵故障消息存储异常:" + ex.getMessage());
                            ex.printStackTrace();
                        }
                        String pushIotFaultMessage = waterPumpAttrErrArr.toJSONString();
                        rabbitProducerTest.SocketQueuePushMessage(pushIotFaultMessage);
                        rabbitProducerTest.SMSQueuePushMessage(pushIotFaultMessage);
                    }
                }
            }//while循环
        } finally {
            consumer.close();
        }
    }

    public String getWaterPumpTopic() {
        return waterPumpTopic;
    }

    public void setWaterPumpTopic(String waterPumpTopic) {
        this.waterPumpTopic = waterPumpTopic;
    }
}
